Basic Configuration

Required installations

  1. Java
  2. Anaconda
  3. Apache Spark
  4. Apache Hadoop
    • only winutils.exe is required
    • download from https://github.com/steveloughran/winutils
    • create a new folder e.g. C:\hadoop\bin where this file should be placed in
    • Run command prompt as Adminstrator and execute
      winutils.exe chmod 777 \tmp\hive
      
  5. Findspark
    • a utility to locate and initialize pyspark
    • install using
      conda install -c conda-forge findspark
      

Environment variables

  • HADOOP_HOME = path\to\hadoop
  • SPARK_HOME = path\to\spark
  • JAVA_HOME = path\to\JavaJDK

Import base packages


In [1]:
import sys, os, shutil
import findspark
# use findspark to locate and initialize pyspark before importing pyspark
findspark.init()
import pyspark

Check environment


In [2]:
print("Python Version:", sys.version)
print("Spark Version:", pyspark.__version__)


Python Version: 3.6.3 |Anaconda, Inc.| (default, Oct 15 2017, 03:27:45) [MSC v.1900 64 bit (AMD64)]
Spark Version: 2.1.1+hadoop2.7

In [3]:
from random import random
from operator import add
from pyspark.sql import SparkSession

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0
    
spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

partitions = 10
num_samples = 10000

count = spark.sparkContext.parallelize(range(1, num_samples + 1), partitions).map(f).reduce(add)

print("Pi is roughly %f" % (4.0 * count / num_samples))

spark.stop()


Pi is roughly 3.132000

Example 2A: Perform Binary Classification using Decision Tree

Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/decision_tree_classification_example.py


In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("DecisionTreeBinaryClassificationExample")\
        .getOrCreate()

data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3], 1234)

# Create a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
                labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

treeModel = model.stages[2]

# summary only
print(treeModel.toDebugString)

spark.stop()


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[95,96,97,12...|
|       1.0|         1.0|(692,[98,99,100,1...|
|       1.0|         1.0|(692,[100,101,102...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[127,128,129...|
+----------+------------+--------------------+
only showing top 5 rows

Test set accuracy = 0.9642857142857143
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4141bc281b063407d90d) of depth 1 with 3 nodes
  If (feature 434 <= 0.0)
   Predict: 1.0
  Else (feature 434 > 0.0)
   Predict: 0.0

Example 2B: Perform Multiclass Classification using Decision Tree


In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("DecisionTreeMulticlassClassificationExample")\
        .getOrCreate()

data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4], 1234)

# Create a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
                labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

treeModel = model.stages[2]

# summary only
print(treeModel.toDebugString)

spark.stop()


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       2.0|         1.0|(4,[0,1,2,3],[-0....|
|       2.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
|       2.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
+----------+------------+--------------------+
only showing top 5 rows

Test set accuracy = 0.9215686274509803
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4ef0987fa2e49c6e229a) of depth 4 with 13 nodes
  If (feature 2 <= -0.694915)
   Predict: 0.0
  Else (feature 2 > -0.694915)
   If (feature 2 <= 0.322034)
    If (feature 3 <= 0.25)
     Predict: 2.0
    Else (feature 3 > 0.25)
     If (feature 0 <= -0.111111)
      Predict: 2.0
     Else (feature 0 > -0.111111)
      Predict: 1.0
   Else (feature 2 > 0.322034)
    If (feature 3 <= 0.25)
     If (feature 0 <= -0.0555556)
      Predict: 2.0
     Else (feature 0 > -0.0555556)
      Predict: 1.0
    Else (feature 3 > 0.25)
     Predict: 1.0

Example 3: Perform Multiclass Classification using Multilayer Perceptron

Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/multilayer_perceptron_classification.py


In [6]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("multilayer_perceptron_classification_example")\
        .getOrCreate()
        
 # Load training data
data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
predictions = model.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
                labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

spark.stop()


+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       2.0|  0.0|(4,[0,1,2,3],[-0....|
|       0.0|  0.0|(4,[0,1,2,3],[-0....|
|       0.0|  0.0|(4,[0,1,2,3],[-0....|
|       2.0|  0.0|(4,[0,1,2,3],[-0....|
|       2.0|  0.0|(4,[0,1,2,3],[-0....|
+----------+-----+--------------------+
only showing top 5 rows

Test set accuracy = 0.9019607843137255

Example 4: End-to-End Workflow using Decision Tree

Using mushroom dataset from https://archive.ics.uci.edu/ml/datasets/Mushroom

This example describes the basic workflow consisting of the following phases:

  1. Prepare Spark Environment
  2. Prepare Data
    1. Read Data
    2. Review Data
    3. Transform Data
      1. Transform Label
      2. Transform Features
      3. Assemble Features
  3. Create Train-Test Split
  4. Create Machine Learning Object
  5. Create Pipeline
  6. Train Machine Learning Object
  7. Evaluate Trained Machine Learning Object
    1. Perform Predictions
    2. Compute Accuracy
    3. Display Trained Model
  8. Stop Spark Session

1. Prepare Spark Environment


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("DecisionTreeBinaryClassificationExampleOnMushrooms")\
        .getOrCreate()

2. Prepare Data

2.A. Read Data

  • data format: csv
  • header row: true
  • infer schema automatically: true

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#module-pyspark.sql


In [8]:
# returns a DataFrame
df = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load("data/mushrooms.csv")

In [9]:
print("Number of data rows:", df.count())


Number of data rows: 8124

In [10]:
df.printSchema() # or data.dtypes or df.columns


root
 |-- class: string (nullable = true)
 |-- cap-shape: string (nullable = true)
 |-- cap-surface: string (nullable = true)
 |-- cap-color: string (nullable = true)
 |-- bruises: string (nullable = true)
 |-- odor: string (nullable = true)
 |-- gill-attachment: string (nullable = true)
 |-- gill-spacing: string (nullable = true)
 |-- gill-size: string (nullable = true)
 |-- gill-color: string (nullable = true)
 |-- stalk-shape: string (nullable = true)
 |-- stalk-root: string (nullable = true)
 |-- stalk-surface-above-ring: string (nullable = true)
 |-- stalk-surface-below-ring: string (nullable = true)
 |-- stalk-color-above-ring: string (nullable = true)
 |-- stalk-color-below-ring: string (nullable = true)
 |-- veil-type: string (nullable = true)
 |-- veil-color: string (nullable = true)
 |-- ring-number: string (nullable = true)
 |-- ring-type: string (nullable = true)
 |-- spore-print-color: string (nullable = true)
 |-- population: string (nullable = true)
 |-- habitat: string (nullable = true)


In [11]:
df.describe("class", "cap-shape", "cap-surface", "cap-color").show()


+-------+-----+---------+-----------+---------+
|summary|class|cap-shape|cap-surface|cap-color|
+-------+-----+---------+-----------+---------+
|  count| 8124|     8124|       8124|     8124|
|   mean| null|     null|       null|     null|
| stddev| null|     null|       null|     null|
|    min|    e|        b|          f|        b|
|    max|    p|        x|          y|        y|
+-------+-----+---------+-----------+---------+


In [12]:
df.first() # or df.head(1) or df.show(1)


Out[12]:
Row(class='p', cap-shape='x', cap-surface='s', cap-color='n', bruises='t', odor='p', gill-attachment='f', gill-spacing='c', gill-size='n', gill-color='k', stalk-shape='e', stalk-root='e', stalk-surface-above-ring='s', stalk-surface-below-ring='s', stalk-color-above-ring='w', stalk-color-below-ring='w', veil-type='p', veil-color='w', ring-number='o', ring-type='p', spore-print-color='k', population='s', habitat='u')

2.C. Transform Data

This is a three-step process:

(a) Transform label 

(b) Transform features 

(c) Assemble features

  • pyspark.ml.feature.StringIndexer: a label indexer that maps a string column of labels to an ML column of label indices.
  • pyspark.ml.feature.VectorIndexer: for indexing categorical feature columns in a dataset of Vector.
  • pyspark.ml.feature.VectorAssembler: a transformer that combines a given list of columns into a single vector column

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#module-pyspark.ml.feature

2.C.a. Transform Label

Using StringIndexer to index labels, adding metadata to the label column


In [13]:
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel")

2.C.b. Transform Features

Using StringIndexer again because VectorIndexer does not allow strings


In [14]:
categorical_columns = df.columns[1:]

featureIndexers = [StringIndexer(inputCol=col, outputCol='stringindexed_' + col) for col in categorical_columns]

2.C.c. Assemble Features

Using VectorAssembler to combine features in a single vector column


In [15]:
inputFeatures = ['stringindexed_' + col for col in categorical_columns]

assembler = VectorAssembler(
                inputCols=inputFeatures,
                outputCol="indexedFeatures")

3. Create Train-Test Split

  • Split the data into training and test sets (30% held out for testing)
  • Use seed=1234 for reproducibility

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit


In [16]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], 1234)

In [17]:
clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

5. Create Pipeline

Chain indexers and model in a Pipeline

  1. labelIndexer
  2. featureIndexers
  3. assembler
  4. clf

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.Pipeline


In [18]:
pipeline = Pipeline(stages=[labelIndexer] + featureIndexers + [assembler, clf])

6. Train Machine Learning Object

  • Execute the stages in the pipeline including featurization and model training on test data using fit

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.Pipeline.fit


In [19]:
model = pipeline.fit(trainingData)

7. Evaluate Trained Machine Learning Object

(1) Perform predictions against the test data by using transform

using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.PipelineModel.transform


In [20]:
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "indexedFeatures").show(5)


+----------+------------+--------------------+
|prediction|indexedLabel|     indexedFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(22,[0,1,2,6,8,9,...|
|       0.0|         0.0|(22,[0,1,2,6,8,9,...|
|       0.0|         0.0|(22,[0,1,2,6,8,9,...|
|       0.0|         0.0|(22,[0,1,2,6,8,9,...|
|       0.0|         0.0|(22,[0,1,2,6,8,9,...|
+----------+------------+--------------------+
only showing top 5 rows


In [21]:
evaluator = MulticlassClassificationEvaluator(
                labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


Test set accuracy = 0.9987557030277893

In [22]:
treeModel = model.stages[-1] # last stage in Pipeline
# summary only
print(treeModel.toDebugString)


DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4a6caa8c22c7ca2acac7) of depth 5 with 13 nodes
  If (feature 4 in {0.0,4.0,5.0,8.0})
   If (feature 19 in {0.0,1.0,2.0,3.0,5.0,6.0,7.0,8.0})
    If (feature 14 in {0.0,1.0,2.0,3.0,5.0,6.0})
     If (feature 14 in {0.0,1.0,2.0,5.0,6.0})
      If (feature 21 in {0.0,1.0,2.0,4.0,5.0,6.0})
       Predict: 0.0
      Else (feature 21 not in {0.0,1.0,2.0,4.0,5.0,6.0})
       Predict: 0.0
     Else (feature 14 not in {0.0,1.0,2.0,5.0,6.0})
      If (feature 10 in {0.0})
       Predict: 0.0
      Else (feature 10 not in {0.0})
       Predict: 1.0
    Else (feature 14 not in {0.0,1.0,2.0,3.0,5.0,6.0})
     Predict: 1.0
   Else (feature 19 not in {0.0,1.0,2.0,3.0,5.0,6.0,7.0,8.0})
    Predict: 1.0
  Else (feature 4 not in {0.0,4.0,5.0,8.0})
   Predict: 1.0


In [23]:
spark.stop()